Appearance
分布式事务的实现
逻辑步骤
RocketMQ 实现分布式事务的实现机制是通过事务消息加上事务反查机制
具体步骤:
- 发送半消息(Prepare Message):
- 生产者向 MQ 发送一个特殊的消息(半消息),这个消息不会立即投递给消费者。
- 执行本地事务:
- 生产者发送半消息后,将执行本地事务逻辑(如数据库操作)。这个步骤的成功与否将决定消息是否可被消费。
- 根据本地事务状态提交或回滚消息:
- 如果本地事务成功,生产者将通知 MQ 提交消息,使其对消费者可见。
- 如果本地事务失败,生产者将通知 MQ 回滚消息,消息将被删除不会投递给消费者。
- 事务状态回查:
- 如果 MQ 长时间没有收到关于这个半消息的最终状态(提交或回滚),MQ 将向生产者发送回查消息。生产者需要检查本地事务的状态,并回应 MQ 事务的最终状态。

事务回查机制
一般会有一个定时机制,然后如果长时间没有收到二次确认,会进行事务回查
- 这个是生产者反而是消费者,收到这条消息后,通过本地实现的 checker 接口的 checkLocalTransaction 方法进行检查本地事务是否执行完成,并将结果告知 MQ 服务器。

代码分析
参考: https://juejin.cn/post/6844904099993878536
学习项目地址: https://gitee.com/wuhonglin2/rocketmq-jta-demo?skip_mobile=true
你先看一下这个 gpt 给的一个 demo 示例
假设您需要在电商系统中处理订单支付的场景,其中用户的支付操作需要更新订单状态并扣减库存,这两个操作要么同时成功,要么同时失败。
步骤一:发送半消息:
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.exception.MQClientException;
TransactionMQProducer producer = new TransactionMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("order_topic", "TagA", "OrderID001", ("支付订单" + order.getId()).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送半消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);步骤 2: 执行本地事务
这个步骤通常是在事务监听器中实现:
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务(比如数据库操作)
orderService.payOrder(order.getId());
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态
String orderId = msg.getKeys();
if (orderService.checkOrderPaid(orderId)) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});步骤 3: 提交或回滚消息
根据本地事务的执行结果,RocketMQ 将处理消息的提交或回滚
项目内容
建表:
-- 积分系统
CREATE DATABASE rocketmq-demo-points;
DROP TABLE IF EXISTS `t_points`;
CREATE TABLE `t_points` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`order_no` bigint(20) NOT NULL COMMENT '订单编号',
`points` int(5) NOT NULL COMMENT '积分',
`remarks` varchar(128) NOT NULL DEFAULT '无' COMMENT '备注',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COMMENT='积分表';
SET FOREIGN_KEY_CHECKS = 1;
-- 订单系统
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
`order_id` bigint(20) NOT NULL COMMENT '订单编号',
`create_time` datetime NOT NULL COMMENT '创建时间',
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`amount` decimal(18,2) NOT NULL COMMENT '订单金额',
PRIMARY KEY (`order_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
DROP TABLE IF EXISTS `transaction_log`;
CREATE TABLE `transaction_log` (
`id` varchar(32) NOT NULL COMMENT '事务ID',
`business` varchar(32) NOT NULL COMMENT '业务标识',
`foreign_key` varchar(32) NOT NULL COMMENT '对应业务表中的主键',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='事务日志表';
SET FOREIGN_KEY_CHECKS = 1;这个项目是分为了两个模块,一个是订单模块,一个是积分模块;
你可以从 demo 中看出,一般我们会先发送一条半消息(通过 sendMessageInTransaction )
/**
* 事务消息发送
* @param data
* @param topic
* @return
*/public TransactionSendResult send(String data, String topic) throws MQClientException {
//使用 RocketMQ 的 TransactionMQProducer 来发送事务性消息
Message message = new Message(topic, data.getBytes());
return producer.sendMessageInTransaction(message, null);
}然后,当 TransactionProducer 发送一个事务消息时,TransactionMQProducer 会先存储一个半消息到消息队列,然后调用 TransactionListener 实现的 executeLocalTransaction 方法
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("开始执行本地事务....");
LocalTransactionState state = null;
try {
String body = new String(message.getBody());
TOrder tOrder = JSONObject.parseObject(body, TOrder.class);
// 执行本地事务
orderService.insertOrder(tOrder, message.getTransactionId());
state = LocalTransactionState.COMMIT_MESSAGE;
log.info("本地事务已提交。{}", message.getTransactionId());
}catch (Exception e){
log.error("执行本地事务失败。{}", e.getMessage());
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}事务回查也是在这个类中进行操作的【消息队列(Broker)未在规定时间内收到消息的最终提交(COMMIT)或回滚(ROLLBACK)状态。在这种情况下,Broker 需要确定消息的确切状态,会进行回查操作】
package com.xxx.order.service;
import com.alibaba.fastjson.JSONObject;
import com.xxx.common.domain.TOrder;
import com.xxx.common.domain.TransactionLog;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Created by Sinotn
*
* @Author: libin
* @CreateTime: 2020-10-28 14:07
* @Description: 订单事务监听类
*/
@Component
@Slf4j
public class OrderTransactionListener implements TransactionListener {
@Autowired
private OrderService orderService;
@Autowired
private TransactionLogService transactionLogService;
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("开始执行本地事务....");
LocalTransactionState state = null;
try {
String body = new String(message.getBody());
TOrder tOrder = JSONObject.parseObject(body, TOrder.class);
// 执行本地事务
orderService.insertOrder(tOrder, message.getTransactionId());
state = LocalTransactionState.COMMIT_MESSAGE;
log.info("本地事务已提交。{}", message.getTransactionId());
}catch (Exception e){
log.error("执行本地事务失败。{}", e.getMessage());
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
log.info("开始回查本地事务....");
LocalTransactionState state = null;
String transId = messageExt.getTransactionId();
// 如果本地事务存在,则事务提交成功
TransactionLog transactionLog = transactionLogService.getById(transId);
if (null != transactionLog){
state = LocalTransactionState.COMMIT_MESSAGE;
}else{
state = LocalTransactionState.ROLLBACK_MESSAGE;
}
return state;
}
}即只有当业务操作成功完成时,相关的消息才会被发送到消息队列供消费者处理。这对于需要强一致性的业务流程至关重要
接收者的部分代码
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
log.info("消费者线程监听到消息。");
for (MessageExt messageExt: list){
if (!processor(messageExt)){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 消息处理,第3次处理失败后,发送邮件通知人工介入
* @param messageExt
* @return
*/private boolean processor(MessageExt messageExt){
String body = new String(messageExt.getBody());
try{
log.info("消息处理...{}", body);
log.info("开始处理订单数据,准备增加积分....");
TOrder order = JSONObject.parseObject(body, TOrder.class);
pointsService.insert(order);
// 模拟异常
//int k = 1/0;
return true;
}catch (Exception e){
if (messageExt.getReconsumeTimes() >= 3){
log.error("消息重试已达最大次数,将通知业务人员排查问题。{}",messageExt.getMsgId());
// 发送邮件或者报警
// sendMail(messageExt);
return true;
}
return false;
}
}这里看半消息 sendMessageInTransaction 和 executeLocalTransaction 的确是对应关系,不过这个关系是在初始化的时候进行定义的
package com.xxx.order.service;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created by Sinotn
*
* @Author: libin
* @CreateTime: 2020-10-28 14:05
* @Description: 事务生产
*/
@Component
public class TransactionProducer {
private static final String GROUP_NAME = "order_trans_group";
private TransactionMQProducer producer;
@Autowired
private OrderTransactionListener orderTransactionListener;
//执行任务的线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(50));
@PostConstruct
public void init(){
producer = new TransactionMQProducer(GROUP_NAME);
producer.setNamesrvAddr("192.168.56.105:9876");
producer.setSendMsgTimeout(100);
producer.setExecutorService(executor);
producer.setTransactionListener(orderTransactionListener);
this.start();
}
private void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 事务消息发送
* @param data
* @param topic
* @return
*/
public TransactionSendResult send(String data, String topic) throws MQClientException {
//使用 RocketMQ 的 TransactionMQProducer 来发送事务性消息
Message message = new Message(topic, data.getBytes());
return producer.sendMessageInTransaction(message, null);
}
}